home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) import cups cups.require('1.9.42') import dbus import dbus.glib as dbus import gobject import time from debug import * import pprint _ = lambda x: x def set_gettext_function(x): _ = x import statereason from statereason import StateReason statereason.set_gettext_function(_) CONNECTING_TIMEOUT = 60 MIN_REFRESH_INTERVAL = 1 def state_reason_is_harmless(reason): if reason.startswith('moving-to-paused') and reason.startswith('paused') and reason.startswith('shutdown') and reason.startswith('stopping') or reason.startswith('stopped-partly'): return True return False def collect_printer_state_reasons(connection): result = { } try: printers = connection.getPrinters() except cups.IPPError: return result for name, printer in printers.iteritems(): reasons = printer['printer-state-reasons'] for reason in reasons: if reason == 'none': break if state_reason_is_harmless(reason): continue if not result.has_key(name): result[name] = [] result[name].append(StateReason(name, reason)) return result class Watcher: def monitor_exited(self, monitor): debugprint(repr(monitor) + ' exited') def state_reason_added(self, monitor, reason): debugprint(repr(monitor) + ': +' + repr(reason)) def state_reason_removed(self, monitor, reason): debugprint(repr(monitor) + ': -' + repr(reason)) def still_connecting(self, monitor, reason): debugprint(repr(monitor) + ": `%s' still connecting" % reason.get_printer()) def now_connected(self, monitor, printer): debugprint(repr(monitor) + ": `%s' now connected" % printer) def current_printers_and_jobs(self, monitor, printers, jobs): debugprint(repr(monitor) + ': printers and jobs lists provided') def job_added(self, monitor, jobid, eventname, event, jobdata): debugprint(repr(monitor) + ': job %d added' % jobid) def job_event(self, monitor, jobid, eventname, event, jobdata): debugprint(repr(monitor) + ": job %d has event `%s'" % (jobid, eventname)) def job_removed(self, monitor, jobid, eventname, event): debugprint(repr(monitor) + ': job %d removed' % jobid) def printer_added(self, monitor, printer): debugprint(repr(monitor) + ": printer `%s' added" % printer) def printer_event(self, monitor, printer, eventname, event): debugprint(repr(monitor) + ": printer `%s' has event `%s'" % (printer, eventname)) def printer_removed(self, monitor, printer): debugprint(repr(monitor) + ": printer `%s' removed" % printer) def cups_connection_error(self, monitor): debugprint(repr(monitor) + ': CUPS connection error') def cups_ipp_error(self, monitor, e, m): debugprint(repr(monitor) + ': CUPS IPP error (%d, %s)' % (e, repr(m))) class Monitor: DBUS_PATH = '/com/redhat/PrinterSpooler' DBUS_IFACE = 'com.redhat.PrinterSpooler' def __init__(self, watcher, bus = None, my_jobs = True, specific_dests = None, monitor_jobs = True, host = None, port = None, encryption = None): self.watcher = watcher self.my_jobs = my_jobs self.specific_dests = specific_dests self.monitor_jobs = monitor_jobs self.jobs = { } self.printer_state_reasons = { } self.printers = set() self.process_pending_events = True self.fetch_jobs_timer = None if host: cups.setServer(host) if port: cups.setPort(port) if encryption: cups.setEncryption(encryption) self.user = cups.getUser() self.host = cups.getServer() self.port = cups.getPort() self.encryption = cups.getEncryption() self.which_jobs = 'not-completed' self.reasons_seen = { } self.connecting_timers = { } self.still_connecting = set() self.connecting_to_device = { } self.received_any_dbus_signals = False self.update_timer = None if bus == None: try: bus = dbus.SystemBus() except dbus.exceptions.DBusException: pass except: None<EXCEPTION MATCH>dbus.exceptions.DBusException None<EXCEPTION MATCH>dbus.exceptions.DBusException if bus != None: bus.add_signal_receiver(self.handle_dbus_signal, path = self.DBUS_PATH, dbus_interface = self.DBUS_IFACE) self.bus = bus self.sub_id = -1 self.refresh() def get_jobs(self): return self.jobs.copy() def cleanup(self): if self.sub_id != -1: user = cups.getUser() try: cups.setUser(self.user) c = cups.Connection(host = self.host, port = self.port, encryption = self.encryption) c.cancelSubscription(self.sub_id) debugprint('Canceled subscription %d' % self.sub_id) except: pass cups.setUser(user) if self.bus != None: self.bus.remove_signal_receiver(self.handle_dbus_signal, path = self.DBUS_PATH, dbus_interface = self.DBUS_IFACE) timers = self.connecting_timers.values() for timer in [ self.update_timer, self.fetch_jobs_timer]: if timer: timers.append(timer) continue for timer in timers: gobject.source_remove(timer) self.watcher.monitor_exited(self) def set_process_pending(self, whether): self.process_pending_events = whether def check_still_connecting(self, printer): '''Timer callback to check on connecting-to-device reasons.''' if not self.process_pending_events: timer = gobject.timeout_add(200, self.check_still_connecting, printer) self.connecting_timers[printer] = timer return False del self.connecting_timers[printer] debugprint("Still-connecting timer fired for `%s'" % printer) (printer_jobs, my_printers) = self.sort_jobs_by_printer() self.update_connecting_devices(printer_jobs) return False def update_connecting_devices(self, printer_jobs = { }): '''Updates connecting_to_device dict and still_connecting set.''' time_now = time.time() connecting_to_device = { } trouble = False for printer, reasons in self.printer_state_reasons.iteritems(): connected = True for reason in reasons: if reason.get_reason() == 'connecting-to-device': have_processing_job = False for job, data in printer_jobs.get(printer, { }).iteritems(): state = data.get('job-state', cups.IPP_JOB_CANCELED) if state == cups.IPP_JOB_PROCESSING: have_processing_job = True break continue if not have_processing_job: debugprint('Ignoring stale connecting-to-device x') continue printer = reason.get_printer() t = self.connecting_to_device.get(printer, time_now) connecting_to_device[printer] = t debugprint('Connecting time: %d' % (time_now - t)) if time_now - t >= CONNECTING_TIMEOUT: if have_processing_job: if printer not in self.still_connecting: self.still_connecting.add(printer) self.watcher.still_connecting(self, reason) if self.connecting_timers.has_key(printer): gobject.source_remove(self.connecting_timers[printer]) del self.connecting_timers[printer] debugprint("Stopped connecting timer for `%s'" % printer) connected = False break continue if connected and self.connecting_timers.has_key(printer): gobject.source_remove(self.connecting_timers[printer]) del self.connecting_timers[printer] debugprint("Stopped connecting timer for `%s'" % printer) continue remove = set() for printer in self.still_connecting: if not connecting_to_device.has_key(printer): remove.add(printer) self.watcher.now_connected(self, printer) if self.connecting_timers.has_key(printer): gobject.source_remove(self.connecting_timers[printer]) del self.connecting_timers[printer] debugprint("Stopped connecting timer for `%s'" % printer) self.connecting_timers.has_key(printer) self.still_connecting = self.still_connecting.difference(remove) self.connecting_to_device = connecting_to_device def check_state_reasons(self, my_printers = set(), printer_jobs = { }): old_reasons_seen_keys = self.reasons_seen.keys() reasons_now = set() for printer, reasons in self.printer_state_reasons.iteritems(): for reason in reasons: tuple = reason.get_tuple() printer = reason.get_printer() reasons_now.add(tuple) if not self.reasons_seen.has_key(tuple): self.watcher.state_reason_added(self, reason) self.reasons_seen[tuple] = reason if reason.get_reason() == 'connecting-to-device' and not self.connecting_to_device.has_key(printer): have_processing_job = False for job, data in printer_jobs.get(printer, { }).iteritems(): state = data.get('job-state', cups.IPP_JOB_CANCELED) if state == cups.IPP_JOB_PROCESSING: have_processing_job = True break continue None if have_processing_job else get_debugging() self.update_connecting_devices(printer_jobs) items = self.reasons_seen.keys() for tuple in items: if tuple not in reasons_now: reason = self.reasons_seen[tuple] del self.reasons_seen[tuple] self.watcher.state_reason_removed(self, reason) continue def get_notifications(self): if not self.process_pending_events: if self.update_timer: gobject.source_remove(self.update_timer) self.update_timer = gobject.timeout_add(200, self.get_notifications) return False debugprint('get_notifications') user = cups.getUser() try: cups.setUser(self.user) c = cups.Connection(host = self.host, port = self.port, encryption = self.encryption) try: try: notifications = c.getNotifications([ self.sub_id], [ self.sub_seq + 1]) except AttributeError: self.process_pending_events self.process_pending_events notifications = c.getNotifications([ self.sub_id]) except: self.process_pending_events except cups.IPPError: self.process_pending_events (e, m) = self.process_pending_events cups.setUser(user) if e == cups.IPP_NOT_FOUND: self.sub_id = -1 self.refresh() return False self.watcher.cups_ipp_error(self, e, m) return True e == cups.IPP_NOT_FOUND except RuntimeError: self.process_pending_events self.process_pending_events cups.setUser(user) self.watcher.cups_connection_error(self) return True cups.setUser(user) deferred_calls = [] jobs = self.jobs.copy() for event in notifications['events']: seq = event['notify-sequence-number'] self.sub_seq = seq nse = event['notify-subscribed-event'] debugprint('%d %s %s' % (seq, nse, event['notify-text'])) if get_debugging(): debugprint(pprint.pformat(event)) if nse.startswith('printer-'): name = event['printer-name'] if nse == 'printer-added' and name not in self.printers: self.printers.add(name) deferred_calls.append((self.watcher.printer_added, (self, name))) continue if nse == 'printer-deleted' and name in self.printers: self.printers.remove(name) items = self.reasons_seen.keys() for tuple in items: if tuple[1] == name: reason = self.reasons_seen[tuple] del self.reasons_seen[tuple] deferred_calls.append((self.watcher.state_reason_removed, (self, reason))) continue if self.printer_state_reasons.has_key(name): del self.printer_state_reasons[name] deferred_calls.append((self.watcher.printer_removed, (self, name))) continue if name in self.printers: printer_state_reasons = event['printer-state-reasons'] reasons = [] for reason in printer_state_reasons: if reason == 'none': break if state_reason_is_harmless(reason): continue reasons.append(StateReason(name, reason)) self.printer_state_reasons[name] = reasons deferred_calls.append((self.watcher.printer_event, (self, name, nse, event))) continue continue jobid = event['notify-job-id'] if (nse == 'job-created' or nse == 'job-state-changed') and not jobs.has_key(jobid) and event['job-state'] == cups.IPP_JOB_PROCESSING: if self.specific_dests != None and event['printer-name'] not in self.specific_dests: continue try: attrs = c.getJobAttributes(jobid) if self.my_jobs and attrs['job-originating-user-name'] != cups.getUser(): continue jobs[jobid] = attrs except AttributeError: jobs[jobid] = { 'job-k-octets': 0 } except cups.IPPError: (e, m) = None self.watcher.cups_ipp_error(self, e, m) jobs[jobid] = { 'job-k-octets': 0 } deferred_calls.append((self.watcher.job_added, (self, jobid, nse, event, jobs[jobid].copy()))) elif (nse == 'job-completed' or nse == 'job-state-changed') and event['job-state'] == cups.IPP_JOB_COMPLETED: if self.which_jobs not in ('completed', 'all'): try: del jobs[jobid] deferred_calls.append((self.watcher.job_removed, (self, jobid, nse, event))) continue except KeyError: continue try: job = jobs[jobid] except KeyError: continue for attribute in [ 'job-state', 'job-name']: job[attribute] = event[attribute] if event.has_key('notify-printer-uri'): job['job-printer-uri'] = event['notify-printer-uri'] deferred_calls.append((self.watcher.job_event, (self, jobid, nse, event, job.copy()))) self.set_process_pending(False) self.update_jobs(jobs) self.jobs = jobs for fn, args in deferred_calls: fn(*args) self.set_process_pending(True) if not self.received_any_dbus_signals: if self.update_timer: gobject.source_remove(self.update_timer) interval = 1000 * notifications['notify-get-interval'] self.update_timer = gobject.timeout_add(interval, self.get_notifications) return False def refresh(self, which_jobs = None, refresh_all = True): debugprint('refresh') if which_jobs != None: self.which_jobs = which_jobs user = cups.getUser() try: cups.setUser(self.user) c = cups.Connection(host = self.host, port = self.port, encryption = self.encryption) except RuntimeError: self.watcher.cups_connection_error(self) cups.setUser(user) return None if self.sub_id != -1: try: c.cancelSubscription(self.sub_id) except cups.IPPError: (e, m) = None self.watcher.cups_ipp_error(self, e, m) if self.update_timer: gobject.source_remove(self.update_timer) debugprint('Canceled subscription %d' % self.sub_id) try: del self.sub_seq except AttributeError: pass events = [ 'printer-added', 'printer-deleted', 'printer-state-changed'] if self.monitor_jobs: events.extend([ 'job-created', 'job-completed', 'job-stopped', 'job-state-changed']) try: self.sub_id = c.createSubscription('/', events = events) except cups.IPPError: (e, m) = None self.watcher.cups_ipp_error(self, e, m) cups.setUser(user) self.update_timer = gobject.timeout_add(MIN_REFRESH_INTERVAL * 1000, self.get_notifications) debugprint('Created subscription %d' % self.sub_id) if self.monitor_jobs: jobs = self.jobs.copy() if self.which_jobs not in ('all', 'completed'): filtered = { } for jobid, job in jobs.iteritems(): if job['job-state'] < cups.IPP_JOB_CANCELED: filtered[jobid] = job continue jobs = filtered self.fetch_first_job_id = 1 if self.fetch_jobs_timer: gobject.source_remove(self.fetch_jobs_timer) self.fetch_jobs_timer = gobject.timeout_add(5, self.fetch_jobs, refresh_all) else: jobs = { } try: self.printer_state_reasons = collect_printer_state_reasons(c) dests = c.getPrinters() self.printers = set(dests.keys()) except cups.IPPError: (e, m) = None self.watcher.cups_ipp_error(self, e, m) return None except RuntimeError: self.watcher.cups_connection_error(self) return None elif self.specific_dests != None: for jobid in jobs.keys(): uri = jobs[jobid].get('job-printer-uri', '/') i = uri.rfind('/') printer = uri[i + 1:] if printer not in self.specific_dests: del jobs[jobid] continue self.set_process_pending(False) self.watcher.current_printers_and_jobs(self, self.printers.copy(), jobs.copy()) self.update_jobs(jobs) self.jobs = jobs self.set_process_pending(True) return False def fetch_jobs(self, refresh_all): if not self.process_pending_events: return True user = cups.getUser() try: cups.setUser(self.user) c = cups.Connection(host = self.host, port = self.port, encryption = self.encryption) except RuntimeError: self.process_pending_events self.process_pending_events self.watcher.cups_connection_error(self) self.fetch_jobs_timer = None cups.setUser(user) return False limit = 1 try: fetched = c.getJobs(which_jobs = self.which_jobs, my_jobs = self.my_jobs, first_job_id = self.fetch_first_job_id, limit = limit) except cups.IPPError: self.process_pending_events (e, m) = self.process_pending_events self.watcher.cups_ipp_error(self, e, m) self.fetch_jobs_timer = None cups.setUser(user) return False cups.setUser(user) got = len(fetched) debugprint('Got %s jobs, asked for %s' % (got, limit)) deferred_calls = [] jobs = self.jobs.copy() jobids = fetched.keys() jobids.sort() if got > 0: last_jobid = jobids[got - 1] else: last_jobid = self.fetch_first_job_id + limit for jobid in xrange(self.fetch_first_job_id, last_jobid + 1): try: job = fetched[jobid] if self.specific_dests != None: uri = job.get('job-printer-uri', '/') i = uri.rfind('/') printer = uri[i + 1:] if printer not in self.specific_dests: raise KeyError printer not in self.specific_dests if jobs.has_key(jobid): fn = self.watcher.job_event else: fn = self.watcher.job_added jobs[jobid] = job deferred_calls.append((fn, (self, jobid, '', { }, job.copy()))) continue except KeyError: if jobs.has_key(jobid): del jobs[jobid] deferred_calls.append((self.watcher.job_removed, (self, jobid, '', { }))) jobs.has_key(jobid) jobids = jobs.keys() jobids.sort() if got < limit: trim = False for i in range(len(jobids)): jobid = jobids[i] if not trim and jobid > last_jobid: trim = True if trim: del jobs[jobid] deferred_calls.append((self.watcher.job_removed, (self, jobid, '', { }))) continue self.update_jobs(jobs) self.jobs = jobs for fn, args in deferred_calls: fn(*args) if got < limit: self.fetch_jobs_timer = None return False next = jobid + 1 while not refresh_all and self.jobs.has_key(next): next += 1 continue got < limit self.fetch_first_job_id = next return True def sort_jobs_by_printer(self, jobs = None): if jobs == None: jobs = self.jobs my_printers = set() printer_jobs = { } for job, data in jobs.iteritems(): state = data.get('job-state', cups.IPP_JOB_CANCELED) if state >= cups.IPP_JOB_CANCELED: continue uri = data.get('job-printer-uri', '') i = uri.rfind('/') if i == -1: continue printer = uri[i + 1:] my_printers.add(printer) if not printer_jobs.has_key(printer): printer_jobs[printer] = { } printer_jobs[printer][job] = data return (printer_jobs, my_printers) def update_jobs(self, jobs): debugprint('update_jobs') (printer_jobs, my_printers) = self.sort_jobs_by_printer(jobs) self.check_state_reasons(my_printers, printer_jobs) def update(self): if self.update_timer: gobject.source_remove(self.update_timer) self.update_timer = gobject.timeout_add(200, self.get_notifications) def handle_dbus_signal(self, *args): self.update() if not self.received_any_dbus_signals: self.received_any_dbus_signals = True if __name__ == '__main__': set_debugging(True) m = Monitor(Watcher()) loop = gobject.MainLoop() try: loop.run() finally: m.cleanup()